Шрифт:
Интервал:
Закладка:
Как только последний поток вызывает метод await, выполняется действие второго барьера и потоки освобождаются.
Синхронизатор Phaser аналогичен синхронизатору CyclicBarrier, за исключением того, что в CyclicBarrier количество участников-потоков фиксировано, а в Phaser новые участники-потоки могут регистрироваться и отменять регистрацию динамически.
В этом примере, мы создаем синхронизатор.
И при создании экземпляра Phaser из основного потока мы передаем 1 в качестве аргумента.
Это эквивалентно вызову метода register из текущего потока.
Мы делаем это, потому что, когда мы создаем три рабочих потока, основной поток является координатором, и поэтому Phaser должен иметь четыре зарегистрированных потока.
Далее создаются три потока, выполняющих задачу, в которую передается синхронизатор.
В этой задаче поток регистрируется для синхронизатора и вызывает метод arriveAndAwaitAdvance, который аналогичен методу await CyclicBarrier и собирает потоки у барьера, включая главный поток.
Как только последний поток вызывает метод arriveAndAwaitAdvance, включая главный поток, фаза завершается.
Отмена регистрации потока для синхронизатора производится методом arriveAndDeregister.
Здесь в первом вызове метод getPhase вернет 0, так как фаза после инициализации равна нулю.
А во втором вызове метод getPhase вернет 1.
Exchanger (обменник) позволяет обменяться данными между двумя потоками в определенной точке работы обоих потоков.
Обмен производится с помощью метода exchange синхронизатора Exchanger, который ожидает, когда другой поток достигнет этой точки обмена, этой точки кода, а затем передает указанный объект другому потоку, получая его объект взамен.
В этом примере создаются два потока, обрабатывающие задачу.
И в этой задаче вызывается метод exchange, который берет значение поля другого потока и присваивает его полю текущего потока.
Параллельные потоки Stream
Ранее мы уже познакомились с программным интерфейсом Stream API, который значительно упрощает работу с коллекциями данных, позволяя отфильтровать, преобразовать и объединить данные без их промежуточного сохранения.
Кроме того, Stream API не модифицирует обрабатываемую коллекцию данных, а позволяет создать на ее основе новую коллекцию данных после обработки.
Напомню, что операторы Stream API делятся на две группы:
Промежуточные операторы, которые обрабатывают поступающие элементы и возвращают стрим.
И терминальные операторы, которые обрабатывают элементы и завершают работу стрима, так что терминальный оператор в цепочке может быть только один.
Обработка коллекции в Stream API не начинается до тех пор, пока не будет вызван терминальный оператор, то есть обработка происходит от терминального оператора к источнику.
И стрим нельзя использовать повторно. Как только вы вызываете любую терминальную операцию, стрим закрывается.
Стрим потоки могут выполняться параллельно, чтобы увеличить производительность обработки большого количества элементов.
Параллельные потоки стрим созданы на основе фреймворка Fork/Join и используют общий пул потоков ForkJoinPool, создаваемый с помощью статического метода ForkJoinPool.commonPool.
Размер общего пула потоков зависит от количества доступных физических ядер процессора.
Под капотом параллельный Stream API обеспечивает работу с потоконебезопасными коллекциями, разбиение коллекции элементов на части, создание потоков и объединение частей вместе.
Потокобезопасность обработки коллекций здесь достигается за счет отсутствия модификации коллекции во время ее обработки.
При использовании параллельного стрима, элементы разбиваются (если это возможно) на несколько групп и обрабатываются в каждом потоке отдельно.
Затем на нужном этапе группы объединяются в одну для предоставления конечного результата.
Чтобы получить параллельный стрим, нужно либо вызвать метод parallelStream вместо метода stream, либо превратить обычный стрим в параллельный, вызвав промежуточный оператор parallel.
Параллельные потоки стрим могут дать прирост производительности в случае большого количества элементов на входе и наличия нескольких ядер процессора. В случае одного ядра их применение бессмыслено.
Если количество элементов небольшое, нужно учесть, что некоторые параллельные стрим операции, такие как reduce и collect, требуют дополнительных вычислений (операций объединения), которые не нужны при последовательном выполнении. Поэтому в случае небольшого количества элементов, последовательный стрим может выполнится быстрее, чем параллельный стрим.
Затраты на разбиение элементов, обработку в другом потоке и последующее их слияние могут быть больше, чем выполнение в одном потоке, в случае небольшого количества элементов и простых операций обработки элементов.
Если операции обработки элементов сильно загружают процессор, например, вычисление тригонометрических функций, имеет смысл применит параллельный стрим и для небольшого количества элементов.
То есть фактор, количество элементов умножить на стоимость обработки каждого элемента, должен быть большим для использования параллельных стрим потоков.
В параллельных стрим потоках лучше всего обрабатывать структуры несвязанных данных, например, ArrayList. LinkedList лучше не использовать, так как в последовательном списке все элементы связаны с предыдущими/последующими элементами. И такие данные трудно распараллелить.
Кроме того, так как все параллельные стрим операции используют один и тот же общий пул потоков ForkJoinPool, нужно избегать блокирования потоков или по крайней мере использовать ManagedBlocker.
Если мы хотим применить долго выполняющиеся операции над элементами коллекции, имеет смысл выделить вычисление стрима в отдельный пул потоков ForkJoinPool.
Здесь мы обертываем вычисление потока в отдельную задачу и передаем ее в отдельный пул потоков.
Если мы не уверены, что на каком-то этапе работы с параллельным стрим, он адекватно сможет выполнить какую-нибудь операцию, мы можем преобразовать этот стрим в последовательный с помощью вызова метода sequential.
Как правило, элементы передаются в поток в том же порядке, в котором они определены в источнике данных.
Сохранение порядка в параллельных потоках увеличивает издержки при выполнении.
Но если нам порядок не важен, то мы можем отключить его сохранение и тем самым увеличить производительность, использовав метод unordered.
В некоторых случаях сохранение порядка в параллельных стрим является важным, например, когда мы хотим отсортировать элементы и вывести их в печать.
В этом примере элементы будут разбиты на потоки, и операция forEach для любого данного элемента может выполняться в любое время и в любом потоке. Поэтому вывод будет неотсортированным.
Если использовать метод forEachOrdered, он заставит обрабатывать элементы по порядку, однако это уберет все преимущество параллельного вычисления.
Метод forEachOrdered обрабатывает элементы стрима в порядке, заданном его источником, независимо от того, выполняете ли вы стрим последовательно или параллельно.
Промежуточные операции подразделяются на